View Javadoc
1   package org.apache.maven.surefire.junitcore.pc;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import org.apache.maven.surefire.report.ConsoleLogger;
23  import org.junit.runner.Description;
24  import org.junit.runners.model.RunnerScheduler;
25  
26  import java.io.ByteArrayOutputStream;
27  import java.io.PrintStream;
28  import java.util.Collection;
29  import java.util.Set;
30  import java.util.concurrent.ConcurrentLinkedQueue;
31  import java.util.concurrent.CopyOnWriteArraySet;
32  import java.util.concurrent.RejectedExecutionException;
33  import java.util.concurrent.RejectedExecutionHandler;
34  import java.util.concurrent.ThreadPoolExecutor;
35  
36  /**
37   * Schedules tests, controls thread resources, awaiting tests and other schedulers finished, and
38   * a master scheduler can shutdown slaves.
39   * <p/>
40   * The scheduler objects should be first created (and wired) and set in runners
41   * {@link org.junit.runners.ParentRunner#setScheduler(org.junit.runners.model.RunnerScheduler)}.
42   * <p/>
43   * A new instance of scheduling strategy should be passed to the constructor of this scheduler.
44   *
45   * @author Tibor Digana (tibor17)
46   * @since 2.16
47   */
48  public class Scheduler
49      implements RunnerScheduler
50  {
51      private final Balancer balancer;
52  
53      private final SchedulingStrategy strategy;
54  
55      private final Set<Controller> slaves = new CopyOnWriteArraySet<Controller>();
56  
57      private final Description description;
58  
59      private final ConsoleLogger logger;
60  
61      private volatile boolean shutdown = false;
62  
63      private volatile boolean started = false;
64  
65      private volatile boolean finished = false;
66  
67      private volatile Controller masterController;
68  
69      /**
70       * Use e.g. parallel classes have own non-shared thread pool, and methods another pool.
71       * <p/>
72       * You can use it with one infinite thread pool shared in strategies across all
73       * suites, class runners, etc.
74       */
75      public Scheduler( ConsoleLogger logger, Description description, SchedulingStrategy strategy )
76      {
77          this( logger, description, strategy, -1 );
78      }
79  
80      /**
81       * Should be used if schedulers in parallel children and parent use one instance of bounded thread pool.
82       * <p/>
83       * Set this scheduler in a e.g. one suite of classes, then every individual class runner should reference
84       * {@link Scheduler(ConsoleLogger, org.junit.runner.Description, Scheduler, SchedulingStrategy)}
85       * or {@link Scheduler(ConsoleLogger, org.junit.runner.Description, Scheduler, SchedulingStrategy, int)}.
86       *
87       * @param logger current logger implementation
88       * @param description description of current runner
89       * @param strategy    scheduling strategy with a shared thread pool
90       * @param concurrency determines maximum concurrent children scheduled a time via {@link #schedule(Runnable)}
91       * @throws NullPointerException if null <tt>strategy</tt>
92       */
93      public Scheduler( ConsoleLogger logger, Description description, SchedulingStrategy strategy, int concurrency )
94      {
95          this( logger, description, strategy, BalancerFactory.createBalancer( concurrency ) );
96      }
97  
98      /**
99       * New instances should be used by schedulers with limited concurrency by <tt>balancer</tt>
100      * against other groups of schedulers. The schedulers share one pool.
101      * <p/>
102      * Unlike in {@link Scheduler(ConsoleLogger, org.junit.runner.Description, SchedulingStrategy, int)} which was
103      * limiting the <tt>concurrency</tt> of children of a runner where this scheduler was set, <em>this</em>
104      * <tt>balancer</tt> is limiting the concurrency of all children in runners having schedulers created by this
105      * constructor.
106      *
107      * @param logger current logger implementation
108      * @param description description of current runner
109      * @param strategy    scheduling strategy which may share threads with other strategy
110      * @param balancer    determines maximum concurrent children scheduled a time via {@link #schedule(Runnable)}
111      * @throws NullPointerException if null <tt>strategy</tt> or <tt>balancer</tt>
112      */
113     public Scheduler( ConsoleLogger logger, Description description, SchedulingStrategy strategy, Balancer balancer )
114     {
115         strategy.setDefaultShutdownHandler( newShutdownHandler() );
116         this.logger = logger;
117         this.description = description;
118         this.strategy = strategy;
119         this.balancer = balancer;
120         masterController = null;
121     }
122 
123     /**
124      * Can be used by e.g. a runner having parallel classes in use case with parallel
125      * suites, classes and methods sharing the same thread pool.
126      *
127      * @param logger current logger implementation
128      * @param description     description of current runner
129      * @param masterScheduler scheduler sharing own threads with this slave
130      * @param strategy        scheduling strategy for this scheduler
131      * @param balancer        determines maximum concurrent children scheduled a time via {@link #schedule(Runnable)}
132      * @throws NullPointerException if null <tt>masterScheduler</tt>, <tt>strategy</tt> or <tt>balancer</tt>
133      */
134     public Scheduler( ConsoleLogger logger, Description description, Scheduler masterScheduler,
135                       SchedulingStrategy strategy, Balancer balancer )
136     {
137         this( logger, description, strategy, balancer );
138         strategy.setDefaultShutdownHandler( newShutdownHandler() );
139         masterScheduler.register( this );
140     }
141 
142     /**
143      * @param masterScheduler a reference to
144      * {@link Scheduler(ConsoleLogger, org.junit.runner.Description, SchedulingStrategy, int)}
145      *                        or {@link Scheduler(ConsoleLogger, org.junit.runner.Description, SchedulingStrategy)}
146      * @see Scheduler(ConsoleLogger, org.junit.runner.Description, SchedulingStrategy)
147      * @see Scheduler(ConsoleLogger, org.junit.runner.Description, SchedulingStrategy, int)
148      */
149     public Scheduler( ConsoleLogger logger, Description description, Scheduler masterScheduler,
150                       SchedulingStrategy strategy, int concurrency )
151     {
152         this( logger, description, strategy, concurrency );
153         strategy.setDefaultShutdownHandler( newShutdownHandler() );
154         masterScheduler.register( this );
155     }
156 
157     /**
158      * Should be used with individual pools on suites, classes and methods, see
159      * {@link org.apache.maven.surefire.junitcore.pc.ParallelComputerBuilder#useSeparatePools()}.
160      * <p/>
161      * Cached thread pool is infinite and can be always shared.
162      */
163     public Scheduler( ConsoleLogger logger, Description description, Scheduler masterScheduler,
164                       SchedulingStrategy strategy )
165     {
166         this( logger, description, masterScheduler, strategy, 0 );
167     }
168 
169     private void setController( Controller masterController )
170     {
171         if ( masterController == null )
172         {
173             throw new NullPointerException( "null ExecutionController" );
174         }
175         this.masterController = masterController;
176     }
177 
178     /**
179      * @param slave a slave scheduler to register
180      * @return <tt>true</tt> if successfully registered the <tt>slave</tt>.
181      */
182     private boolean register( Scheduler slave )
183     {
184         boolean canRegister = slave != null && slave != this;
185         if ( canRegister )
186         {
187             Controller controller = new Controller( slave );
188             canRegister = !slaves.contains( controller );
189             if ( canRegister )
190             {
191                 slaves.add( controller );
192                 slave.setController( controller );
193             }
194         }
195         return canRegister;
196     }
197 
198     /**
199      * @return <tt>true</tt> if new tasks can be scheduled.
200      */
201     private boolean canSchedule()
202     {
203         return !shutdown && ( masterController == null || masterController.canSchedule() );
204     }
205 
206     protected void logQuietly( Throwable t )
207     {
208         ByteArrayOutputStream out = new ByteArrayOutputStream();
209         PrintStream stream = new PrintStream( out );
210         t.printStackTrace( stream );
211         stream.close();
212         logger.info( out.toString() );
213     }
214 
215     protected void logQuietly( String msg )
216     {
217         logger.info( msg );
218     }
219 
220     /**
221      * Attempts to stop all actively executing tasks and immediately returns a collection
222      * of descriptions of those tasks which have started prior to this call.
223      * <p/>
224      * This scheduler and other registered schedulers will stop, see {@link #register(Scheduler)}.
225      * If <tt>shutdownNow</tt> is set, waiting methods will be interrupted via {@link Thread#interrupt}.
226      *
227      * @param stopNow if <tt>true</tt> interrupts waiting test methods
228      * @return collection of descriptions started before shutting down
229      */
230     protected ShutdownResult describeStopped( boolean stopNow )
231     {
232         Collection<Description> executedTests = new ConcurrentLinkedQueue<Description>();
233         Collection<Description> incompleteTests = new ConcurrentLinkedQueue<Description>();
234         stop( executedTests, incompleteTests, false, stopNow );
235         return new ShutdownResult( executedTests, incompleteTests );
236     }
237 
238     /**
239      * Stop/Shutdown/Interrupt scheduler and its children (if any).
240      *
241      * @param executedTests       Started tests which have finished normally or abruptly till called this method.
242      * @param incompleteTests     Started tests which have finished incomplete due to shutdown.
243      * @param tryCancelFutures    Useful to set to {@code false} if a timeout is specified in plugin config.
244      *                            When the runner of
245      *                            {@link ParallelComputer#getSuite(org.junit.runners.model.RunnerBuilder, Class[])}
246      *                            is finished in
247      *                            {@link org.junit.runners.Suite#run(org.junit.runner.notification.RunNotifier)}
248      *                            all the thread-pools created by {@link ParallelComputerBuilder.PC} are already dead.
249      *                            See the unit test <em>ParallelComputerBuilder#timeoutAndForcedShutdown()</em>.
250      * @param stopNow             Interrupting tests by {@link java.util.concurrent.ExecutorService#shutdownNow()} or
251      *                            {@link java.util.concurrent.Future#cancel(boolean) Future#cancel(true)} or
252      *                            {@link Thread#interrupt()}.
253      */
254     private void stop( Collection<Description> executedTests, Collection<Description> incompleteTests,
255                        boolean tryCancelFutures, boolean stopNow )
256     {
257         shutdown = true;
258         try
259         {
260             if ( started && !ParallelComputerUtil.isUnusedDescription( description ) )
261             {
262                 if ( executedTests != null )
263                 {
264                     executedTests.add( description );
265                 }
266 
267                 if ( incompleteTests != null && !finished )
268                 {
269                     incompleteTests.add( description );
270                 }
271             }
272 
273             for ( Controller slave : slaves )
274             {
275                 slave.stop( executedTests, incompleteTests, tryCancelFutures, stopNow );
276             }
277         }
278         finally
279         {
280             try
281             {
282                 balancer.releaseAllPermits();
283             }
284             finally
285             {
286                 if ( stopNow )
287                 {
288                     strategy.stopNow();
289                 }
290                 else if ( tryCancelFutures )
291                 {
292                     strategy.stop();
293                 }
294                 else
295                 {
296                     strategy.disable();
297                 }
298             }
299         }
300     }
301 
302     protected boolean shutdownThreadPoolsAwaitingKilled()
303     {
304         if ( masterController == null )
305         {
306             stop( null, null, true, false );
307             boolean isNotInterrupted = true;
308             if ( strategy != null )
309             {
310                 isNotInterrupted = strategy.destroy();
311             }
312             for ( Controller slave : slaves )
313             {
314                 isNotInterrupted &= slave.destroy();
315             }
316             return isNotInterrupted;
317         }
318         else
319         {
320             throw new UnsupportedOperationException( "cannot call this method if this is not a master scheduler" );
321         }
322     }
323 
324     protected void beforeExecute()
325     {
326     }
327 
328     protected void afterExecute()
329     {
330     }
331 
332     public void schedule( Runnable childStatement )
333     {
334         if ( childStatement == null )
335         {
336             logQuietly( "cannot schedule null" );
337         }
338         else if ( canSchedule() && strategy.canSchedule() )
339         {
340             try
341             {
342                 boolean isNotInterrupted = balancer.acquirePermit();
343                 if ( isNotInterrupted && !shutdown )
344                 {
345                     Runnable task = wrapTask( childStatement );
346                     strategy.schedule( task );
347                     started = true;
348                 }
349             }
350             catch ( RejectedExecutionException e )
351             {
352                 stop( null, null, true, false );
353             }
354             catch ( Throwable t )
355             {
356                 balancer.releasePermit();
357                 logQuietly( t );
358             }
359         }
360     }
361 
362     public void finished()
363     {
364         try
365         {
366             strategy.finished();
367         }
368         catch ( InterruptedException e )
369         {
370             logQuietly( e );
371         }
372         finally
373         {
374             finished = true;
375         }
376     }
377 
378     private Runnable wrapTask( final Runnable task )
379     {
380         return new Runnable()
381         {
382             public void run()
383             {
384                 try
385                 {
386                     beforeExecute();
387                     task.run();
388                 }
389                 finally
390                 {
391                     try
392                     {
393                         afterExecute();
394                     }
395                     finally
396                     {
397                         balancer.releasePermit();
398                     }
399                 }
400             }
401         };
402     }
403 
404     protected ShutdownHandler newShutdownHandler()
405     {
406         return new ShutdownHandler();
407     }
408 
409     /**
410      * If this is a master scheduler, the slaves can stop scheduling by the master through the controller.
411      */
412     private final class Controller
413     {
414         private final Scheduler slave;
415 
416         private Controller( Scheduler slave )
417         {
418             this.slave = slave;
419         }
420 
421         /**
422          * @return <tt>true</tt> if new children can be scheduled.
423          */
424         boolean canSchedule()
425         {
426             return Scheduler.this.canSchedule();
427         }
428 
429         void stop( Collection<Description> executedTests, Collection<Description> incompleteTests,
430                    boolean tryCancelFutures, boolean shutdownNow )
431         {
432             slave.stop( executedTests, incompleteTests, tryCancelFutures, shutdownNow );
433         }
434 
435         /**
436          * @see org.apache.maven.surefire.junitcore.pc.Destroyable#destroy()
437          */
438         boolean destroy()
439         {
440             return slave.strategy.destroy();
441         }
442 
443         @Override
444         public int hashCode()
445         {
446             return slave.hashCode();
447         }
448 
449         @Override
450         public boolean equals( Object o )
451         {
452             return o == this || ( o instanceof Controller ) && slave.equals( ( (Controller) o ).slave );
453         }
454     }
455 
456     /**
457      * There is a way to shutdown the hierarchy of schedulers. You can do it in master scheduler via
458      * {@link #shutdownThreadPoolsAwaitingKilled()} which kills the current master and children recursively.
459      * If alternatively a shared {@link java.util.concurrent.ExecutorService} used by the master and children
460      * schedulers is shutdown from outside, then the {@link ShutdownHandler} is a hook calling current
461      * {@link #describeStopped(boolean)}. The method {@link #describeStopped(boolean)} is again shutting down children
462      * schedulers recursively as well.
463      */
464     public class ShutdownHandler
465         implements RejectedExecutionHandler
466     {
467         private volatile RejectedExecutionHandler poolHandler;
468 
469         protected ShutdownHandler()
470         {
471             poolHandler = null;
472         }
473 
474         public void setRejectedExecutionHandler( RejectedExecutionHandler poolHandler )
475         {
476             this.poolHandler = poolHandler;
477         }
478 
479         public void rejectedExecution( Runnable r, ThreadPoolExecutor executor )
480         {
481             if ( executor.isShutdown() )
482             {
483                 Scheduler.this.stop( null, null, true, false );
484             }
485             final RejectedExecutionHandler poolHandler = this.poolHandler;
486             if ( poolHandler != null )
487             {
488                 poolHandler.rejectedExecution( r, executor );
489             }
490         }
491     }
492 }